package com.katesoft.scale4j.rttp.messaging.jdbc.init;

import static com.katesoft.scale4j.rttp.internal.IRttpHazelcastRefs.MESSAGE_STORE_CREATION_LOCK;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashSet;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.perf4j.StopWatch;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.ScriptStatementFailedException;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.katesoft.scale4j.rttp.internal.AbstractJdbcSchemaCreator;

/**
 * @author kate2007
 */
public class RttpMessageStoreSchemaCreator extends AbstractJdbcSchemaCreator {
   public RttpMessageStoreSchemaCreator() {
      tablesPrefix = "INT_";
   }

   /**
    * will create message store schema if does not exists
    * 
    * @param connection
    *           database connection
    * @throws java.sql.SQLException
    *            re-throw any SQLExceptions
    */
   @Override
   public void populate(Connection connection) throws SQLException {
      if (getSpringIntegrationCreationScript() != null) {
         StopWatch stopWatch = new StopWatch(MESSAGE_STORE_CREATION_LOCK);
         HazelcastInstance instance = springHazelcastBridge.getRunningInstance();

         ILock lock = instance.getLock(MESSAGE_STORE_CREATION_LOCK);
         logger.info("using %s and dialect %s for message store schema creation", lock.toString(),
                  dialect.toString());
         try {
            if (lock.tryLock()) {
               try {
                  if (!tablesCreated(connection)) {
                     if (!springHazelcastBridge.getAtomicBoolean(MESSAGE_STORE_CREATION_LOCK).get()) {
                        logger.info(
                                 "message store schema is not created, will create using connection %s",
                                 connection);
                        springHazelcastBridge
                                 .updateAtomicBoolean(MESSAGE_STORE_CREATION_LOCK, true);
                        super.setScripts(new Resource[] { getSpringIntegrationCreationScript() });
                        try {
                           super.populate(connection);
                        } catch (ScriptStatementFailedException e) {
                           logger.warn(
                                    "unable to create message store schema for dialect %s, exception = %s",
                                    dialect.getSimpleName(), ExceptionUtils.getFullStackTrace(e));
                        }
                     }
                  }
               } finally {
                  lock.unlock();
               }
            }
         } finally {
            stopWatch.stop();
            logger.debug("message store schema created in %s milisecs using %s",
                     stopWatch.getElapsedTime(), connection);
         }
      }
   }

   @Override
   protected Collection<String> tables() {
      Collection<String> tables = new LinkedHashSet<String>();
      tables.add(tablesPrefix + "MESSAGE");
      tables.add(tablesPrefix + "MESSAGE_GROUP");
      return tables;
   }
}
